{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Class Imbalance\n", "\n", "Class imbalance arises when the number of classes in the data set is imbalanced. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "%load_ext autoreload\n", "%autoreload 2\n", "import ipywidgets\n", "from ipywidgets import interact, interactive, interact_manual\n", "import IPython\n", "from matplotlib import rcParams\n", "rcParams['figure.figsize'] = (16, 8)\n", "rcParams['font.size'] = 16\n", "\n", "import numpy as np\n", "import matplotlib.pyplot as plt\n", "from utilities.load_data import linear_separable_data, circular_separable_data\n", "from utilities import plot_helpers \n", "from sklearn.svm import LinearSVC\n", "from sklearn.linear_model import SGDClassifier\n", "import sklearn.metrics as metrics\n", "# from sklearn.metrics import plot_roc_curve\n", "# impofrt sklearn.warnings ConvergenceWarning as ConvergenceWarning\n", "\n", "import warnings\n", "# warnings.simplefilter(action='ignore', category=FutureWarning)\n", "warnings.filterwarnings(action='ignore')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Define usual Metrics to evaluate classifiers" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def build_confusion_matrix(pred_label, true_label, num_classes=2):\n", " \"\"\"This works for predictions in {0, 1, ..., Num Classes}.\"\"\"\n", " confusion_matrix = np.zeros((num_classes, num_classes))\n", " for row in range(num_classes):\n", " for col in range(num_classes):\n", " confusion_matrix[row, col] = np.sum(np.bitwise_and(pred_label == row, true_label == col))\n", " return confusion_matrix\n", "\n", "def accuracy(pred_label, true_label):\n", " # Option 1, works for any prediction output. \n", " total = len(true_label)\n", " correct = np.sum(pred_label == true_label)\n", " acc = correct / total\n", " \n", " # Option 2, works for prediction in {-1, 1}. \n", " confusion_matrix = build_confusion_matrix((pred_label + 1) // 2, (true_label + 1) // 2)\n", " acc2 = np.sum(np.diag(confusion_matrix)) / np.sum(confusion_matrix)\n", " \n", " assert acc == acc2\n", " return acc\n", " \n", "\n", "def precision(pred_label, true_label):\n", " # Option 1, works for prediction in {-1, 1}. \n", " true_positive = np.sum(np.bitwise_and(true_label == 1, pred_label == 1))\n", " false_positive = np.sum(np.bitwise_and(true_label == -1, pred_label == 1))\n", " \n", " total_pred_positive = true_positive + false_positive\n", " assert total_pred_positive == np.sum(pred_label == 1)\n", " if total_pred_positive == 0:\n", " return 0\n", " \n", " prec1 = true_positive / total_pred_positive\n", " \n", " # Option 2, works for prediction in {-1, 1}. \n", " confusion_matrix = build_confusion_matrix((pred_label + 1) // 2, (true_label + 1) // 2)\n", " true_positive = confusion_matrix[1, 1]\n", " total_pred_positive = np.sum(confusion_matrix[1])\n", " prec2 = true_positive / total_pred_positive\n", "\n", " assert prec1 == prec2 \n", " return prec1\n", "\n", "\n", "def recall(pred_label, true_label):\n", " # Option 1, works for prediction in {-1, 1}. \n", " true_positive = np.sum(np.bitwise_and(true_label == 1, pred_label == 1))\n", " false_negative = np.sum(np.bitwise_and(true_label == 1, pred_label == -1))\n", " \n", " total_true_positive = true_positive + false_negative\n", " assert total_true_positive == np.sum(true_label == 1)\n", " if total_true_positive == 0:\n", " return 0\n", " \n", " rec1 = true_positive / total_true_positive\n", " \n", " # Option 2, works for prediction in {-1, 1}. \n", " confusion_matrix = build_confusion_matrix((pred_label + 1) // 2, (true_label + 1) // 2)\n", " true_positive = confusion_matrix[1, 1]\n", " total_true_positive = np.sum(confusion_matrix[:, 1])\n", " rec2 = true_positive / total_true_positive\n", "\n", " assert rec1 == rec2 \n", " return rec1\n", "\n", "def f1_score(pred_label, true_label):\n", " precision_ = precision(pred_label, true_label)\n", " recall_ = recall(pred_label, true_label)\n", " if precision_ == 0 or recall_ == 0:\n", " return 0 \n", " else:\n", " return 2 / (1 / precision_ + 1 / recall_)\n", " \n", "def make_meshgrid(x, y, h=.02):\n", " x_min, x_max = -2, 3\n", " y_min, y_max = -2, 3\n", " xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))\n", " return xx, yy\n", " \n", "def plot_imbalanced(X, Y, classifier=None, Xtest=None, Ytest=None, fig=None):\n", " if Xtest is None:\n", " if fig is None:\n", " fig = plt.subplot(111)\n", " opt = {'marker': 'r*', 'label': '+'}\n", " plot_helpers.plot_data(X[np.where(Y == 1)[0], 0], X[np.where(Y == 1)[0], 1], fig=fig, options=opt)\n", " opt = {'marker': 'bs', 'label': '-'}\n", " plot_helpers.plot_data(X[np.where(Y == -1)[0], 0], X[np.where(Y == -1)[0], 1], fig=fig, options=opt)\n", "\n", " if classifier is not None:\n", " xx, yy = make_meshgrid(X[:, 0], X[:, 1])\n", " Z = classifier.predict(np.c_[xx.ravel(), yy.ravel()])\n", " Z = Z.reshape(xx.shape)\n", " out = plt.contourf(xx, yy, Z, colors=['blue', 'red'], alpha=0.3)\n", "\n", " plt.xlim([-2, 3])\n", " plt.ylim([-2, 3])\n", " \n", " else:\n", " fig, ax = plt.subplots(1, 2)\n", " plt.sca(ax[0])\n", " plot_imbalanced(X, Y, classifier, fig=ax[0])\n", " plt.title('Train Data')\n", " \n", " plt.sca(ax[1])\n", " plot_imbalanced(Xtest, Ytest, classifier, fig=ax[1])\n", " plt.title('Test Data')\n", "\n", "\n", "def print_metrics(pred_label, true_label, pred_score):\n", " acc = accuracy(pred_label, true_label)\n", " prec = precision(pred_label, true_label)\n", " rec = recall(pred_label, true_label)\n", " f1 = f1_score(pred_label, true_label)\n", " confusion_matrix = build_confusion_matrix((pred_label + 1) // 2, (true_label + 1) // 2)\n", " \n", " fpr, tpr, thresholds = metrics.roc_curve(true_label, pred_score)\n", " auc_ = metrics.auc(fpr, tpr)\n", " \n", " print('Accuracy: {:.2f}. Precision: {:.2f}. Recall: {:.2f}. F1-Score: {:.2f}. AUC: {:.2f}'.format(acc, prec, rec, f1, auc_))\n", " print('Confusion Matrix: \\n', confusion_matrix)\n", " \n", " " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def generate_data(num_positive, num_negative, noise):\n", " X, Y = linear_separable_data(num_positive, num_negative, noise=noise, dim=2)\n", " train_idx = np.random.choice(total, int(0.8 * total), replace=False)\n", " test_idx = [i for i in range(total) if i not in train_idx]\n", " Xtrain, Ytrain = X[train_idx], Y[train_idx]\n", " Xtest, Ytest = X[test_idx], Y[test_idx]\n", " \n", " return X, Y, Xtrain, Ytrain, Xtest, Ytest\n", "\n", "num_positive = 10 \n", "num_negative = 100\n", "total = num_positive + num_negative\n", "noise = 0.5\n", "np.random.seed(0)\n", "X, Y, Xtrain, Ytrain, Xtest, Ytest = generate_data(num_positive, num_negative, noise)\n", "plot_imbalanced(X, Y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Option 0: Vanilla Classifier" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def vanilla(X, Y, classifier, Xtest=None, Ytest=None):\n", " classifier.fit(X[:, :2], Y)\n", " plot_imbalanced(X, Y, classifier, Xtest, Ytest)\n", " return classifier\n", "\n", "classifier = vanilla(Xtrain, Ytrain, LinearSVC(), Xtest, Ytest)\n", "print('Train')\n", "Ypred = classifier.predict(Xtrain[:, :2])\n", "Spred = classifier.decision_function(Xtrain[:, :2])\n", "print_metrics(Ypred, Ytrain, Spred)\n", "\n", "\n", "print('Test')\n", "Ypred = classifier.predict(Xtest[:, :2])\n", "Spred = classifier.decision_function(Xtest[:, :2])\n", "print_metrics(Ypred, Ytest, Spred)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Option 1: Downsampling majority class" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def downsampling(X, Y, classifier, Xtest=None, Ytest=None):\n", " pos_idx = np.where(Y == 1)[0]\n", " neg_idx = np.where(Y == -1)[0]\n", " total_positives = len(pos_idx)\n", " total_negatives = len(neg_idx)\n", "\n", " down_idx = np.random.choice(np.arange(total_negatives), total_positives, replace=False)\n", " down_train_idx = np.concatenate((pos_idx, neg_idx[down_idx]))\n", "\n", " X_down, Y_down = X[down_train_idx], Y[down_train_idx]\n", " \n", " assert len(down_idx) == total_positives\n", " classifier.fit(X_down[:, :2], Y_down) # Use only the first two features as the classifier fits a bias term.\n", " \n", " plot_imbalanced(X_down, Y_down, classifier, Xtest, Ytest)\n", " return classifier \n", "\n", "classifier = downsampling(Xtrain, Ytrain, LinearSVC(), Xtest, Ytest)\n", "print('Train')\n", "Ypred = classifier.predict(Xtrain[:, :2])\n", "Spred = classifier.decision_function(Xtrain[:, :2])\n", "print_metrics(Ypred, Ytrain, Spred)\n", "\n", "\n", "print('Test')\n", "Ypred = classifier.predict(Xtest[:, :2])\n", "Spred = classifier.decision_function(Xtest[:, :2])\n", "print_metrics(Ypred, Ytest, Spred)\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Option 2: Upsampling minority class" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def upsampling(X, Y, classifier, up_sampling_noise=1e-1, Xtest=None, Ytest=None):\n", " pos_idx = np.where(Y == 1)[0]\n", " neg_idx = np.where(Y == -1)[0]\n", " total_positives = len(pos_idx)\n", " total_negatives = len(neg_idx)\n", "\n", " up_idx = np.random.choice(np.arange(total_positives), total_negatives, replace=True)\n", " up_train_idx = np.concatenate((pos_idx[up_idx], neg_idx))\n", " assert len(up_idx) == total_negatives\n", "\n", " X_up, Y_up = X[up_train_idx], Y[up_train_idx]\n", " X_up[:total_negatives, :2] += up_sampling_noise * np.random.randn(total_negatives, 2) # perturb the up-sampling\n", "\n", " classifier.fit(X_up[:, :2], Y_up) # Use only the first two features as the classifier fits a bias term.\n", " \n", " plot_imbalanced(X_up, Y_up, classifier, Xtest, Ytest)\n", " return classifier \n", "\n", "up_sampling_noise=1e-1\n", "classifier = upsampling(Xtrain, Ytrain, LinearSVC(), up_sampling_noise, Xtest, Ytest)\n", "print('Train')\n", "Ypred = classifier.predict(Xtrain[:, :2])\n", "Spred = classifier.decision_function(Xtrain[:, :2])\n", "print_metrics(Ypred, Ytrain, Spred)\n", "\n", "\n", "print('Test')\n", "Ypred = classifier.predict(Xtest[:, :2])\n", "Spred = classifier.decision_function(Xtest[:, :2])\n", "print_metrics(Ypred, Ytest, Spred)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Option 3: Cost-Sensitive Classification" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def cost_sensitive(X, Y, classifier, class_ratio, Xtest=None, Ytest=None):\n", " classifier.class_weight = {-1: 1, 1: class_ratio}\n", " \n", " classifier.fit(X[:, :2], Y)\n", " plot_imbalanced(X, Y, classifier, Xtest, Ytest)\n", " return classifier\n", "\n", "class_ratio = num_negative / num_positive\n", "classifier = cost_sensitive(Xtrain, Ytrain, LinearSVC(), class_ratio, Xtest, Ytest)\n", "\n", "print('Train')\n", "Ypred = classifier.predict(Xtrain[:, :2])\n", "Spred = classifier.decision_function(Xtrain[:, :2])\n", "print_metrics(Ypred, Ytrain, Spred)\n", "\n", "\n", "print('Test')\n", "Ypred = classifier.predict(Xtest[:, :2])\n", "Spred = classifier.decision_function(Xtest[:, :2])\n", "print_metrics(Ypred, Ytest, Spred)\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Compare All Methods" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "up_sampling_noise = 1e-1\n", "\n", "def imbalanced_learn(method, noise, classifier, weight_ratio):\n", " np.random.seed(0)\n", " X, Y, Xtrain, Ytrain, Xtest, Ytest = generate_data(num_positive=20, num_negative=200, noise=noise)\n", "\n", " if classifier.lower() == 'svm':\n", " classifier = LinearSVC()\n", " elif classifier.lower() == 'perceptron':\n", " classifier = SGDClassifier(loss='perceptron', random_state=1, max_iter=1000)\n", " else:\n", " raise ValueError('Not Implemented classifer.')\n", " \n", " if method.lower() == 'vanilla':\n", " classifier = vanilla(Xtrain, Ytrain, classifier, Xtest, Ytest)\n", " elif method.lower() == 'downsampling':\n", " classifier = downsampling(Xtrain, Ytrain, classifier, Xtest, Ytest)\n", " elif method.lower() == 'upsampling': \n", " classifier = upsampling(Xtrain, Ytrain, classifier, up_sampling_noise, Xtest, Ytest)\n", " elif method.lower() == 'cost-sensitive':\n", " classifier = cost_sensitive(Xtrain, Ytrain, classifier, weight_ratio, Xtest, Ytest)\n", " \n", " \n", " print('Train')\n", " Ypred = classifier.predict(Xtrain[:, :2])\n", " Spred = classifier.decision_function(Xtrain[:, :2])\n", " print_metrics(Ypred, Ytrain, Spred)\n", "\n", " \n", " print('Test')\n", " Ypred = classifier.predict(Xtest[:, :2])\n", " Spred = classifier.decision_function(Xtest[:, :2])\n", " print_metrics(Ypred, Ytest, Spred)\n", " \n", "\n", "noise_widget = ipywidgets.FloatSlider(value=0.6, min=0, max=1, step=0.1, continuous_update=False)\n", "weight_ratio_widget = ipywidgets.FloatLogSlider(value=10, min=-2, max=3, continuous_update=False)\n", "interact(imbalanced_learn, method=['vanilla', 'downsampling', 'upsampling', 'cost-sensitive'], \n", " noise=noise_widget, classifier=['perceptron', 'svm'], weight_ratio=weight_ratio_widget);\n", " " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 2 }